package com.menghao.rpc.netty;

import com.menghao.rpc.NamedThreadFactory;
import com.menghao.rpc.RpcConstants;
import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.netty.in.TcpInboundHandler;
import com.menghao.rpc.netty.in.TcpMessageDecoder;
import com.menghao.rpc.netty.out.TcpMessageEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * <p>借助Netty实现TCP服务端.<br>
 *
 * @author MarvelCode.
 */
public class TcpServer {

    private static Logger LOGGER = LoggerFactory.getLogger(TcpServer.class);

    private int exceptPort;

    private int actualPort;

    private int maxFrameLength;
    private int readIdle;
    private int writIdle;
    private List<TcpMessageHandler> messageHandlers;

    private EventLoopGroup bossGroup;

    private EventLoopGroup workerGroup;

    public TcpServer(int exceptPort, int maxFrameLength, int readIdle, int writIdle, List<TcpMessageHandler> messageHandlers) {
        this.exceptPort = exceptPort;
        this.maxFrameLength = maxFrameLength;
        this.readIdle = readIdle;
        this.writIdle = writIdle;
        this.messageHandlers = messageHandlers;
    }

    private ServerBootstrap initBootstrap() {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bossGroup = new NioEventLoopGroup(1, new NamedThreadFactory("netty-server-select", true));
        workerGroup = new NioEventLoopGroup(0, new NamedThreadFactory("netty-server-io", true));
        // 设置轮询线程、处理线程池
        bootstrap.group(bossGroup, workerGroup)
                //  标识当服务器请求处理线程全满时，用于临时存放已完成三次握手的请求的队列的最大长度
                .option(ChannelOption.SO_BACKLOG, 1000)
                // 是否允许一个地址重复绑定
                .option(ChannelOption.SO_REUSEADDR, true)
                // 是否使用Nagle的算法以尽可能发送大块数据
                .childOption(ChannelOption.TCP_NODELAY, true)
                // 是否启动心跳保活机制（长连接）
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 基于内存池的缓冲区重用机制
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        // 服务端需要序列化 rpcResponse、反序列化 rpcRequest
                        ch.pipeline().addLast(new TcpMessageEncoder())
                                .addLast(new TcpMessageDecoder(maxFrameLength, RpcRequest.class))
                                .addLast(new IdleStateHandler(readIdle, writIdle, 0))
                                .addLast(new TcpInboundHandler(messageHandlers));
                    }
                });
        return bootstrap;
    }

    public void start() {
        ServerBootstrap bootstrap = initBootstrap();
        while (true) {
            ChannelFuture future = bootstrap.bind(exceptPort);
            future.awaitUninterruptibly();
            if (future.isSuccess()) {
                this.actualPort = exceptPort;
                LOGGER.info(RpcConstants.LOG_RPC_PREFIX + "bootstrap bind port {}", actualPort);
                break;
            }
            exceptPort++;
            LOGGER.info(RpcConstants.LOG_RPC_PREFIX + "bootstrap bind port {} fail, retry bind port {}.", exceptPort, exceptPort + 1);
        }
    }

    public void close() {
        if (this.bossGroup != null) {
            this.bossGroup.shutdownGracefully();
        }

        if (this.workerGroup != null) {
            this.workerGroup.shutdownGracefully();
        }
    }

    public int getPort() {
        return actualPort;
    }
}
